package com.chen.rabbitmq.four;

import com.chen.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

/**
 * @author Y·C
 * @version 1.0.0
 * @ClassName ConfirmMessage.java
 * @Description 发布确认模式
 *                使用的时间 比较哪种确认方式是最好的
 *                  1.单个确认
 *                  2.批量确认
 *                  3.异步批量确认
 * @createTime 2023年05月08日 15:55:00
 */
public class ConfirmMessage {

    //批量发送消息的个数
    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception {
        //1.单个确认
//        ConfirmMessage.publishMessageIndividually(); //发布1000单独确认消息，耗时16722ms
        //2.批量确认
//        ConfirmMessage.publishMessageBatch(); //发布1000批量发布确认消息，耗时207ms
        //3.异步批量确认
         ConfirmMessage.publishMessageAsync(); //发布1000异步确认发布确认消息，耗时41ms
    }

    //单个确认
    public static void publishMessageIndividually() throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,false,false,null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();

        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",queueName,null,message.getBytes());
            //单个消息就马上进行确认
            boolean flag = channel.waitForConfirms();
            if (flag){
                System.out.println("消息发送成功");
            }
        }
        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+MESSAGE_COUNT + "单独确认消息，耗时"+(end-begin)+"ms");
    }

    //批量发布确认
    public static void publishMessageBatch() throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        /**
         * 队列的声明
         * 1.队列名称
         * 2.队列里面的消息是否持久化（磁盘）默认情况下消息存储在内存中
         * 3.该队列是否提供一个消费者进行消费 是否进行消息共享，true可以多个消费者消费false：只能一个消费者消费
         * 4.是否自动删除 最后一个消费者断开连接以后  该队列是否自动删除 true自动删除 false 不自动删除
         * 5.其他参数
         * */
        channel.queueDeclare(queueName,false,false,false,null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();

        //批量确认消息大小
        int batchSize = 100;

        //批量发送消息 批量发布确认
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",queueName,null,message.getBytes());
            //判断达到100条消息的时候 ， 批量确认一次
            if ((i+1)%batchSize == 0){
                //发布确认
                channel.confirmSelect();
            }
        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "批量发布确认消息，耗时" + (end-begin) + "ms");

    }

    //异步发布确认
    public static void publishMessageAsync() throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        /**
         * 队列的声明
         * 1.队列名称
         * 2.队列里面的消息是否持久化（磁盘）默认情况下消息存储在内存中
         * 3.该队列是否提供一个消费者进行消费 是否进行消息共享，true可以多个消费者消费false：只能一个消费者消费
         * 4.是否自动删除 最后一个消费者断开连接以后  该队列是否自动删除 true自动删除 false 不自动删除
         * 5.其他参数
         * */
        channel.queueDeclare(queueName,false,false,false,null);
        //开启发布确认
        channel.confirmSelect();


        /**
         * 线程安全有序的一个哈希表 适用于高并发的情况下
         * 1.轻松的将序号与消息进行关联
         * 2.轻松批量删除条目 只要给到序号
         * 3.支持高并发（多线程）
         * */
        ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();


        //消息确认成功  回调函数
        ConfirmCallback ackCallback = (deliveryTag, multiple) ->{
            if (multiple){
                //2.删除到已经确认的消息 剩下的就是未确认的消息
                ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag);
                confirmed.clear();
            }else {
                outstandingConfirms.remove(deliveryTag);
            }
            System.out.println("确认的消息" + deliveryTag);
        } ;
        //消息确认失败 回调函数
        /**
         * 1.消息的标记
         * 2.是否为批量确认
         * */
        ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
            //3.打印一下未确认的消息都有哪些
            String message = outstandingConfirms.get(deliveryTag);
            System.out.println("未确认的消息是:"+message+"::::未确认的消息tag" + deliveryTag);
        };

        //批量消息的监听器 监听那些消息成功了 那些消息失败了
        /**
         * 异步监听
         * 1.监听哪些消息成功了
         * 2.监听哪些消息失败了
         * */
        channel.addConfirmListener(ackCallback,nackCallback);


        //开始时间
        long begin = System.currentTimeMillis();

        //批量发送消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = "消息" + i;
            channel.basicPublish("",queueName,null,message.getBytes());
            //1.在此处记录下所有发送的消息 消息的总和
            outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "异步确认发布确认消息，耗时" + (end-begin) + "ms");
    }

}
